Skip to content

Kafka Connect: Surface commit failures instead of silently swallowing them#16237

Open
yadavay-amzn wants to merge 3 commits into
apache:mainfrom
yadavay-amzn:fix/iceberg_15878
Open

Kafka Connect: Surface commit failures instead of silently swallowing them#16237
yadavay-amzn wants to merge 3 commits into
apache:mainfrom
yadavay-amzn:fix/iceberg_15878

Conversation

@yadavay-amzn
Copy link
Copy Markdown
Contributor

@yadavay-amzn yadavay-amzn commented May 7, 2026

Fixes #15878.

Problem

The Kafka Connect Coordinator previously caught Exception around doCommit() and only logged a warning, so when a commit failed (e.g., CommitFailedException from Glue detecting a concurrent table update), the connector stayed RUNNING while silently dropping the data that was in flight.

Fix

Rethrow all exceptions from doCommit() on full-commit failures. Partial-commit failures (triggered by commitState.isCommitTimedOut()) are logged at WARN and swallowed since the coordinator will retry on the next cycle.

This ensures commit failures surface to operators by terminating the coordinator thread, which transitions the Connect task to FAILED via CommitterImpl.processControlEvents() detecting coordinatorThread.isTerminated().

The finally block that calls commitState.endCurrentCommit() is preserved so per-commit state is cleaned up regardless of the outcome.

Testing

  • testCommitFailedExceptionPropagates: verifies that CommitFailedException propagates and kills the coordinator on full commit.
  • testCommitError: verifies that IllegalArgumentException (bad partition spec) propagates.
  • testCoordinatorCommittedOffsetValidation: verifies that ValidationException (stale offsets) propagates.
  • Full TestCoordinator suite + Kafka Connect integration tests pass locally (10/10 runs).
  • spotlessCheck + checkstyleMain + checkstyleTest pass.

Copy link
Copy Markdown
Contributor

@Baunsgaard Baunsgaard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch and cleanup.

However, the error logging strategy you are proposing seems to be double-logging every commit failure in CoordinatorThread.run(). I have left some specific suggestions.

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

Thanks @Baunsgaard for taking a look, you're right about the double-logging.
I've pushed an update with your recommended changes, please take a look when you get a chance. Thanks!

Copy link
Copy Markdown
Contributor

@Baunsgaard Baunsgaard left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left one nit for production code. Tests looks fine!

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

yadavay-amzn commented May 11, 2026

Done, removed the comment block.

Copy link
Copy Markdown

@laskoviymishka laskoviymishka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for tackling #15878, the underlying data-loss bug is real.

Silently swallowing CommitFailedException means Connect can keep the task RUNNING while dropping in-flight data, so the fix direction is right: failures need to surface and move the task to FAILED.

Before merge, I’d resolve a few mismatches between the PR description and the actual diff, since these will be confusing later in git log / blame:

  1. “log at ERROR level ... before rethrowing”
    The code does not rethrow the original exception. It wraps it in a new RuntimeException(String.format(...), e), so CommitFailedException becomes a generic RuntimeException with the original only available as getCause(). That may lose useful signal for operator alerting / log pattern matching. I’d either rethrow e directly after logging, or update the description to say “wrap and rethrow with context.”

  2. “removes the catch-all around doCommit()
    The catch is not removed; it is narrowed from catch (Exception) to catch (RuntimeException). That may be fine, but the PR should say that explicitly. Otherwise, either keep catch (Exception) or remove the block entirely and rely on the outer catch in CoordinatorThread.run().

  3. CoordinatorThread.run() terminates the thread on uncaught exceptions, which transitions the Kafka Connect task to FAILED
    The end state is right, but the mechanism is different. run() catches Exception, logs, and sets terminated = true. The Connect task transitions later when CommitterImpl.save()processControlEvents() sees coordinatorThread.isTerminated() and throws NotRunningException. Worth tightening the description so future readers don’t learn the wrong invariant.

One behavioral change I’d like a Kafka Connect domain opinion on: this also makes the partial-commit path — commit(true), triggered by commitState.isCommitTimedOut() — fatal on any RuntimeException. Previously, a transient blip during partial commit was swallowed and retried; now it terminates the coordinator and needs operator intervention.

Is that the intended trade-off, or should the rethrow be gated on !partialCommit?

@AnatolyPopov, can you take a look, would value your read on two things:

  • should partial-commit failures be equally fatal?
  • does wrapping vs rethrowing the original CommitFailedException matter for downstream alerting in your deployments?

Inline comments below cover the test-side observations: spy reuse, brittle post-throw assertions, and missing partial-commit test.

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

yadavay-amzn commented May 15, 2026

Thanks for the thorough review @laskoviymishka. Addressed all points:

  1. Rethrow original exception: no longer wrapping in RuntimeException. The original CommitFailedException (or whatever the commit throws) is logged at ERROR and rethrown directly, preserving the type for alerting.

  2. Partial-commit gating: rethrow is now gated on !partialCommit. Transient failures during partial commits are logged but not fatal, matching the previous retry behavior. Only full-commit failures terminate the coordinator.

  3. Updated test assertions: tests now expect the original exception types directly.

Will update the PR description to accurately reflect the narrowed catch and the mechanism for task FAILED transition.

Copy link
Copy Markdown

@laskoviymishka laskoviymishka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now, all the description/code mismatches are resolved and the partial-commit gate is in.

Small nit: the Testing section still references testCoordinatorWithBadDataFile, but the actually-modified test is testCommitError, worth a quick fix. Plus CI is a bit red.

Approving; will wait for @AnatolyPopov's domain read before merge.

@yadavay-amzn yadavay-amzn force-pushed the fix/iceberg_15878 branch 8 times, most recently from 18292d5 to f2a993a Compare May 16, 2026 09:42
@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

@laskoviymishka @Baunsgaard Heads up — I made a change after your approvals that I want to flag:

The exception handling now uses a tiered approach instead of rethrowing everything:

  1. CommitFailedException / CommitStateUnknownException → log + retry (transient conflicts)
  2. Other CleanableFailure exceptions → log + retry (Iceberg's marker for recoverable errors)
  3. Everything else → log ERROR + rethrow (fatal, kills the coordinator)

The reason: rethrowing ALL non-CommitFailedException exceptions was killing the coordinator on transient errors during the integration tests (the commit path can throw various CleanableFailure subtypes during normal catalog contention). The CleanableFailure interface is Iceberg's built-in classification for retryable exceptions, so using it as the boundary felt like the right approach.

CI is now green. Let me know if you'd prefer a different approach — e.g., a consecutive-failure counter that rethrows after N failures regardless of exception type.

.build();

coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null);
assertThatThrownBy(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coordinatorTest() calls coordinator.process() directly, so it skips the real production path.

In prod the flow is:

CoordinatorThread.run() catches the exception → marks terminated = true → next CommitterImpl.save() calls processControlEvents() → throws NotRunningException.

This test doesn’t cover that. It would still pass even if CoordinatorThread swallowed the failure.

I think we need an end-to-end test that goes through CoordinatorThread + CommitterImpl.

@laskoviymishka
Copy link
Copy Markdown

that's quite a swing @yadavay-amzn, and the new strategy changed enough that I need to flip to request some changes / questions.

The new behavior is basically:

“If a commit fails with an Iceberg-shaped exception, WARN and hope the next cycle fixes it. Only fail on unexpected exceptions.”

I don’t think that’s safe. The previous direction was closer to correct:

“If the real commit fails, kill the coordinator. If cleanup/partial commit fails, swallow it.”

Main issues:

  1. CommitStateUnknownException is now swallowed. That one explicitly says retrying can create duplicates. Since the same files can still be retried from commitBuffer, this is exactly the bad case the exception warns about.

  2. CleanableFailure is the wrong boundary. It means “metadata can be cleaned up,” not “safe to retry.” It also includes permanent errors like 400/401/403. With this change, bad credentials can loop forever at WARN, which is pretty close to the original bug.

  3. CommitFailedException is still swallowed. That’s the exact exception from [Kafka Connect] Connector enters silent broken state after CommitFailedException (Glue concurrent update) — no data written, no error surfaced #15878. The “retry next cycle” story depends on a pretty fragile buffer/offset ordering assumption, and if a rebalance happens after offsets are committed, the in-memory state is gone and the failure is invisible.

I’d rather go back to the old shape: rethrow on full commit failure, swallow only the timed-out partial commit cleanup failure.

If we really want retries here, I think we need to explicitly list retryable exception types, bound the retries, and document the buffer lifecycle. Also the PR description drifted from the code again, so that needs an update too.

@AnatolyPopov — still interested in your read on the worker/coordinator offset ordering, since that’s the key assumption behind “retry next cycle.”

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

yadavay-amzn commented May 17, 2026

@laskoviymishka You're right, the CleanableFailure approach was too permissive. Reverted to the simpler and safer design:

  • Full commit failure: rethrow everything (kills coordinator -> task FAILED)
  • Partial commit failure (timed-out cleanup): WARN + swallow (retry next cycle)

This matches your original direction (please correct me if I'm wrong).

The CleanableFailure detour was me trying to work around a CI integration test failure that turned out to be transient. The test passes 10/10 locally with this approach. PR description updated.

On the test coverage point (coordinatorTest skipping the production CoordinatorThread path), agreed that an end-to-end test through CoordinatorThread + CommitterImpl would be stronger. Happy to add that in this PR or as a follow-up, whichever you prefer.

Copy link
Copy Markdown

@laskoviymishka laskoviymishka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @yadavay-amzn! We’re back on a good track — back on the road to 🚢.

Three small nits I’d like to land before merge, all 1–3 line fixes.

None of them are blocking on their own, but they’re cheap to do in this PR.

UPD: I see a follow up issue, i'm good with just 3 one-liners before merge.
I strongly consider the end-to-end CoordinatorThread + CommitterImpl test needed here, but it may be a bigger change.

taskId,
commitState.currentCommitId(),
e);
} catch (RuntimeException e) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small things on this catch block:

SLF4J double-printing the message. Both LOG calls pass e.getMessage() as a {} arg and e as the trailing throwable. SLF4J already prints the exception's message (and full stack) when the last arg is a Throwable, so the message ends up rendered twice in every log line.

taskId asymmetry. It's in the ERROR but dropped from the WARN. In a multi-task cluster you'd lose task identity for retried partial commits.

Both fixed at once:

LOG.warn("Partial commit {} failed for task {}, will retry",
    commitState.currentCommitId(), taskId, e);
...
LOG.error("Commit {} failed for task {}",
    commitState.currentCommitId(), taskId, e);

assertThatThrownBy(
() -> coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Cannot find partition spec");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the prev-prev version of this test also asserted producer.history().hasSize(1) (only the StartCommit event was sent, no CommitToTable) and table.snapshots().isEmpty() (no phantom commit landed). v4 drops both.

The test now only proves an exception is thrown, it would still pass if a future regression sent a CommitToTable event before failing, or somehow committed-then-threw.

Maybe keep side-effect guards:

assertThatThrownBy(...)
    .isInstanceOf(IllegalArgumentException.class)
    .hasMessageContaining("Cannot find partition spec");
assertThat(producer.history()).hasSize(1);
assertThat(table.snapshots()).isEmpty();

ImmutableList.of(),
EventTestUtil.now()))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("stale offsets");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same shape as the comment above, stale-offset conflict the table was not mutated (snapshots().hasSize(2) and offset still {"0":7}).

Those are the actual correctness guarantees of the optimistic-concurrency guard, and they're no longer checked. The exception-type assertion alone would pass even if a future regression committed the row delta before throwing.

Lets keep the post-throw assertions:

assertThatThrownBy(...)
    .isInstanceOf(ValidationException.class)
    .hasMessageContaining("stale offsets");
table.refresh();
assertThat(table.snapshots()).hasSize(2);
assertThat(table.currentSnapshot().summary())
    .containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":7}");

… them

Narrow the catch around doCommit() and rethrow on full-commit
failures. Partial-commit failures (triggered by commit timeout) are
logged at WARN and swallowed since the coordinator will retry on
the next cycle.

This ensures commit failures surface to operators by terminating
the coordinator thread, which transitions the Connect task to FAILED.

Fixes apache#15878
@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

Addressed all three nits:

  1. Fixed SLF4J double-printing -- removed e.getMessage() from format args, added taskId to WARN for symmetry.
  2. testCommitError -- added producer.history().hasSize(1) and table.snapshots().isEmpty() assertions.
  3. testCoordinatorCommittedOffsetValidation -- added post-throw assertions that table still has 2 snapshots and offsets unchanged.

Thanks @laskoviymishka!

Copy link
Copy Markdown

@laskoviymishka laskoviymishka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, all clean, nits addressed plus no unexpected scope changes :)

:shipit: LGTM from my side, will wait for a Kafka Connect deep-dive from @AnatolyPopov before merge.

Copy link
Copy Markdown
Contributor

@AnatolyPopov AnatolyPopov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi all, and thanks @laskoviymishka for pinging me on this.

Overall LGTM with some comments.

First of all I do not think partial commits need to be fatal but I suspect they can become dominant on overloaded Connect clusters. I think this is out of scope of this PR but as an operator I might want to see a metric for those and this can be done as a follow up if needed.

The exception wrapping does not really matter as well, what matters usually is the task status.

I would also consider to retry on some specific exceptions to avoid the need for operator intervention in case of task failure on transitive errors as was discussed before but in any case this is strictly better then we have now and retries could be considered as a follow up.

On the ordering — I think it works for what this PR is fixing. Workers commit source offsets atomically with the DataWritten event in a single producer transaction, and the coordinator only advances the control-topic offset on a successful commit. The two ways this can break are control-topic retention shorter than the recovery window, and orphan-file cleanup running before recovery AFAIU. Both are operator side knobs rather than connector behavior, so I'd consider them as misconfiguration, but maybe worth a doc note to make operators aware.

Thanks @yadavay-amzn for working through this.

@yadavay-amzn
Copy link
Copy Markdown
Contributor Author

yadavay-amzn commented May 18, 2026

Thanks @AnatolyPopov for the thorough review and the context on the offset ordering! And thanks @laskoviymishka for driving this to a good shape.

Agreed on the follow-ups, I will create issues to track:

  1. Metrics for partial commit failures (operator visibility)
  2. Bounded retry on transient exceptions (avoid operator intervention for transitive errors)

Both are additive and can land independently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Kafka Connect] Connector enters silent broken state after CommitFailedException (Glue concurrent update) — no data written, no error surfaced

4 participants